Add new functionalities to reset offsets #194
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
IMPORTANT: Please create an issue first before opening a Pull Request.
Linked to issue(s): #146
What changes were proposed in this pull request?
Checking offsets
You can now check where you are on the different queues, that is retrieving the offsets for each topic that you are polling:
In this example, I have two topic,
fink_sso_ztf_candidates_ztf
andfink_sso_fink_candidates_ztf
.For the first topic, there is one active partition on the remote Kafka cluster that served data (number
[4]
). I polled 1 alert (Committed
), and there are972
remaining alerts to be polled (Lag
). As there is only one active partition on the remote Kafka cluster, the total is the same (there could be up to 10 active partitions). For the second topic, I did not start polling as0
alert has beenCommitted
.Resetting offsets
Sometimes you might want to poll again alerts, that is restarting to poll from the beginning of a queue. For this, you can use:
fink_consumer --display -start_at earliest Resetting offsets to BEGINNING ... assign TopicPartition{topic=fink_sso_fink_candidates_ztf,partition=0,offset=0,leader_epoch=None,error=None} ... assign TopicPartition{topic=fink_sso_ztf_candidates_ztf,partition=0,offset=0,leader_epoch=None,error=None} ... # poll restarts at the first offset
All your topic partitions will be reset to the starting offset (
0
in this case). Similarly, you can empty all topics, and restarting polling from the last offset:Empty partitions will have
offset=0
, but others will have their offset to the latest one. The client will then wait for new data to come. Note that the reset will be actually triggered on the next poll. Hence the commandfink_consumer --display_statistics
will not right away display the reset offsets.This is particularly useful after a bug in the topic (malformed alerts pushed), and you want a fresh restart.
How was this patch tested?
Manual